package org.zjvis.datascience.service.dataset;

import cn.hutool.core.util.CharUtil;
import cn.hutool.db.Entity;
import cn.hutool.db.meta.MetaUtil;
import cn.hutool.db.meta.Table;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.constant.DatabaseConstant;
import org.zjvis.datascience.common.constant.NoticeConstant;
import org.zjvis.datascience.common.dto.*;
import org.zjvis.datascience.common.dto.dataset.*;
import org.zjvis.datascience.common.dto.user.UserDTO;
import org.zjvis.datascience.common.enums.NoticeTypeEnum;
import org.zjvis.datascience.common.enums.ProjectRoleAuthEnum;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.model.ColumnSchema;
import org.zjvis.datascience.common.model.TableSchema;
import org.zjvis.datascience.common.util.*;
import org.zjvis.datascience.common.util.db.CryptoUtil;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.dataset.DatasetIdAndProjectIdVO;
import org.zjvis.datascience.common.vo.dataset.HeadVO;
import org.zjvis.datascience.common.vo.dataset.PreviewDatasetVO;
import org.zjvis.datascience.common.vo.db.DBPreviewVO;
import org.zjvis.datascience.common.vo.db.DBWithBDNameVO;
import org.zjvis.datascience.common.vo.db.DbConnectionTestVO;
import org.zjvis.datascience.service.NoticeService;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.mapper.*;

import javax.sql.DataSource;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.zjvis.datascience.common.constant.DatasetConstant.DEFAULT_ID_FIELD;

/**
 * @description Dataset 数据集管理Service
 * @date 2021-12-27
 */
@Service
public class DatasetService {

    private final static Logger logger = LoggerFactory.getLogger("DatasetService");

    @Autowired
    DatasetMapper datasetMapper;

    @Autowired
    GPDataProvider gpDataProvider;

    @Autowired
    UserMapper userMapper;

    @Autowired
    DatabaseMapper databaseMapper;

    @Autowired
    DatasetProjectMapper datasetProjectMapper;

    @Autowired
    SqlCategoryMapper sqlCategoryMapper;

    @Autowired
    DatasetCategoryMapper datasetCategoryMapper;

    @Lazy
    @Autowired
    NoticeService noticeService;

    @Autowired
    UserProjectMapper userProjectMapper;


    private static final int DATASET_NAME_MAX_AUTO_INCREMENT = 100;

    /**
     * 根据数据集id获取数据集
     *
     * @param datasetId
     * @return
     */
    public DatasetDTO queryById(Long datasetId) {
        return datasetMapper.queryById(datasetId);
    }

    /**
     * 根据分类id获取数据集
     *
     * @param categoryId
     * @return
     */
    public List<DatasetDTO> queryByCategoryId(Long categoryId) {
        return datasetMapper.queryByCategoryId(categoryId);
    }

    /**
     * 更新
     *
     * @param datasetDTO
     * @return
     */
    public Boolean update(DatasetDTO datasetDTO) {
        int res;
        try {
            res = datasetMapper.update(datasetDTO);
        } catch (DuplicateKeyException e) {
            throw new DataScienceException(BaseErrorCode.DATASET_NAME_DUPLICATE_ERROR);
        }
        if (res > 0) {
            return true;
        }
        return false;
    }

    public boolean exist(String name, Long categoryId, Long id) {
        DatasetDTO datasetDTO = new DatasetDTO();
        datasetDTO.setName(name);
        datasetDTO.setGmtCreator(id);
        datasetDTO.setCategoryId(categoryId);
        return datasetMapper.selectOne(datasetDTO) != null;
    }

    /**
     * 删除数据集
     *
     * @param datasetDTO
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public Boolean delete(DatasetDTO datasetDTO) {
        datasetDTO = datasetMapper.queryById(datasetDTO.getId());
        int res = datasetMapper.delete(datasetDTO);
        if (res > 0) {
            /* 删除GP中数据集源表 */
            Connection con = null;
            PreparedStatement ps = null;
            try {
                String json = datasetDTO.getDataJson();
                JSONObject jo = JSONObject.parseObject(json);
                String schema = jo.getString("schema");
                String table = jo.getString("table");
                String newTable = schema + "." + SqlUtil.formatPGSqlColName(table);
                con = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID);
                ps = con.prepareStatement(String.format(DatabaseConstant.SQL_DROP_TABLE, newTable));
                ps.execute();
            } catch (Exception e) {
                logger.error(e.getMessage());
                throw new DataScienceException(BaseErrorCode.DATASET_GP_DELETE_ERROR);
            } finally {
                JDBCUtil.close(con, ps, null);
            }

            return true;
        }
        return false;
    }


    public JSONObject queryDataById(DatasetIdAndProjectIdVO vo) {
        return queryDataById(vo.getDatasetId(), vo.getCurPage(), vo.getPageSize());
    }

    /**
     * 数据集表数据预览
     *
     * @param datasetId
     * @return
     */
    public JSONObject queryDataById(Long datasetId, Integer curPage, Integer pageSize) {
        Connection con = null;
        ResultSet rs = null;
        PreparedStatement ps = null;
        JSONObject jo = new JSONObject();
        JSONObject result = new JSONObject();
        result.put("code", 200);

        Page<JSONArray> page = new Page<>();
        page.setPageSize(pageSize == null ? 20 : pageSize);
        page.setCurPage(curPage == null ? 1 : curPage);
        int offset = (page.getCurPage() - 1) * page.getPageSize();

        try {
            //获取数据集信息
            DatasetDTO dataset = datasetMapper.queryById(datasetId);

            if (dataset == null) {
                result.put("code", 401);
                result.put("errMsg", "无权访问!,数据集id:" + datasetId);
                return result;
            }

            jo.put("name", dataset.getName());
            jo.put("createTime", dataset.formatTime(dataset.getGmtCreate()));

            /* 获取owner用户名 */
            UserDTO user = userMapper.selectByPrimaryKey(dataset.getUserId());
            String userName = "";
            if (user != null) {
                userName = user.getName();
            }
            jo.put("owner", userName);

            String dj = dataset.getDataJson();
            if (!JSONUtil.isJsonObj(dj)) {
                result.put("code", 401);
                result.put("errMsg", "关联数据不存在，无法找到原表");
                return result;
            }
            JSONObject config = JSONObject.parseObject(dj);

            String table = config.getString("table");
            String schema = config.getString("schema");
            String sortSql = " order by " + DEFAULT_ID_FIELD;
            String type = config.getString("type");
            String dataConfig = config.getString("dataConfig");

            jo.put("type", type);

            //判断是不是http数据，是的话对显示的url进行拼接
            if (type.equals("http")) {
                dataConfig = "/api/dataset/httpDataImport/" + dataset.getDataConfig();
                jo.put("dataConfig", dataConfig);
                String incrementalDataConfig = dataset.getIncrementalDataConfig();
                jo.put("incrementalDataConfig", incrementalDataConfig);
            }
            //数据库数据返回配置信息
            long userId = JwtUtil.getCurrentUserId();
            if (type.equals("table")) {
                jo.put("dataConfig",
                        JSONObject.parseObject(DataConfigEncryption.decrypt(Long.toString(userId),
                                dataset.getDataConfig())));
                String incrementalDataConfig = dataset.getIncrementalDataConfig();
                JSONObject incrementalJson = JSONObject.parseObject(incrementalDataConfig);
                jo.put("scheduleConfig",
                        JSONObject.parseObject(incrementalJson.getString("detail")));
            }

            //获取gp数据库中数据集库的连接（指定id为1）
            con = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID);

            // 先变更update以防统计大小时有新的update
            if (config.get("update") != null) {
                config.remove("update");
            }
            /* 获取数据总条数(表加索引总大小)，表大小用:pg_table_size,索引大小用：pg_indexes_size */
            ps = con.prepareStatement(String
                    .format(DatabaseConstant.GP_SELECT_TOTAL_RELATION_SIZE, schema,
                            SqlUtil.formatPGSqlColName(table)));
            rs = ps.executeQuery();
            while (rs.next()) {
                jo.put("size", rs.getString(1));
            }
            config.put("size", ConvertByteUtil.convertLong(jo.getString("size")));

            JDBCUtil.close(null, ps, rs);

            /* 获取数据总条数 */
            ps = con.prepareStatement(String
                    .format(DatabaseConstant.GP_SELECT_TABLE_DATA_COUNT, schema,
                            SqlUtil.formatPGSqlColName(table)));
            rs = ps.executeQuery();
            while (rs.next()) {
                jo.put("count", rs.getInt(1));
                page.setTotalElementsAndPage(rs.getInt(1));
            }
            //在dataset表中保存总行数
            Integer totalRow = config.getInteger("totalRow");
            if (totalRow == null || !totalRow.equals(jo.getInteger("count"))) {
                config.put("totalRow", jo.getInteger("count"));
            }
            dataset.setDataJson(config.toJSONString());
            datasetMapper.update(dataset);

            //取semantic信息
            JSONObject semantics = null;
            if (config.containsKey("columnMessage")) {
                List<JSONObject> columnMessage = config.getJSONArray("columnMessage").toJavaList(JSONObject.class);
                JSONObject fieldSemantics = new JSONObject();
                columnMessage.stream().forEach(msg -> {
                    fieldSemantics.put(msg.getString("name"), msg.getString("semantic"));
                });
                semantics = fieldSemantics;
            }

            JSONArray ja = new JSONArray();

            String limitSql = String
                    .format(DatabaseConstant.GP_LIMIT_OFFSET_SQL, page.getPageSize(), offset);
            String sql = String
                    .format(DatabaseConstant.GP_SELECT_SQL, schema, SqlUtil.formatPGSqlColName(table),
                            sortSql, limitSql);
            ps = con.prepareStatement(sql);
            rs = ps.executeQuery();
            ResultSetMetaData meta = rs.getMetaData();

            /* 生成head结构 */
            JSONArray heads = HeadUtil.wrapHead(meta, null, semantics, null);
            jo.put("head", heads);


            List<String> colNames = IntStream.range(1, meta.getColumnCount() + 1).mapToObj(i -> {
                try {
                    return meta.getColumnName(i);
                } catch (SQLException e) {
                }
                return StringUtils.EMPTY;
            }).filter(name -> !DEFAULT_ID_FIELD.equals(name)).collect(Collectors.toList());

            /* 生成data结构 */
            while (rs.next()) {
                JSONObject column = new JSONObject();
                for (String colName : colNames) {
                    if (DEFAULT_ID_FIELD.equals(colName)) {
                        continue;
                    }
                    column.put(colName, rs.getString(colName));
                }
                ja.add(column);
            }
            jo.put("data", ja);
        } catch (Exception e) {
            result.put("code", 500);
            result.put("errMsg", e.getMessage());
            logger.error(e.getMessage());
        } finally {
            JDBCUtil.close(con, ps, rs);
        }
        jo.put("page", page);
        result.put("data", jo);
        return result;
    }

    public List<DatasetNameTypeDTO> queryByProjectId(Long projectId) {
        List<DatasetDTO> datasets = datasetMapper.queryByProjectId(projectId);
        List<DatasetNameTypeDTO> res = new ArrayList<>();
        for (DatasetDTO dataset : datasets) {
            DatasetNameTypeDTO datasetNameDTO = DozerUtil.mapper(dataset, DatasetNameTypeDTO.class);
            try {
                JSONObject dataJson = JSONObject.parseObject(dataset.getDataJson());
                datasetNameDTO.setType(dataJson.getString("type"));
                datasetNameDTO
                        .setTableName(dataJson.getString("schema") + "." + dataJson.getString("table"));
            } catch (Exception e) {
                logger.error(e.getMessage());
            }

            res.add(datasetNameDTO);
        }
        return res;
    }

    public List<DatasetNameTypeDTO> queryByProjectId2(Long projectId) {
        List<DatasetDTO> datasets = datasetMapper.queryByProjectId(projectId);
        List<DatasetNameTypeDTO> res = new ArrayList<>();
        for (DatasetDTO dataset : datasets) {
            DatasetNameTypeDTO datasetNameDTO = DozerUtil.mapper(dataset, DatasetNameTypeDTO.class);
            try {
                JSONObject dataJson = JSONObject.parseObject(dataset.getDataJson());
                datasetNameDTO.setType(dataJson.getString("type"));
                datasetNameDTO
                        .setTableName(dataJson.getString("schema") + "." + dataJson.getString("table"));
            } catch (Exception e) {
                logger.error(e.getMessage());
            }

            res.add(datasetNameDTO);
        }
        return res;
    }

    public JSONObject databaseConnectionTest(DbConnectionTestVO conTest) {
        JSONObject jo = new JSONObject();
        String server = conTest.getServer();
        Integer port = conTest.getPort();
        String url = "";
        String sql = "";
        String dbType = conTest.getDatabaseType();
        dbType = dbType == null ? "" : dbType.trim().toLowerCase();
        try {
            switch (dbType) {
                case "mysql":
                case "rds-mysql":
                    url = String
                            .format(DatabaseConstant.MYSQL_JDBC_URL_WITHOUT_DATABASE, server, port);
                    sql = DatabaseConstant.MYSQL_SHOW_DATABASES;
                    break;
                case "oracle":
                    String connectType = conTest.getConnectType();
                    String connectValue = conTest.getConnectValue();
                    switch (connectType == null ? "" : connectType.toLowerCase()) {
                        case "servicename":
                            url = String
                                    .format(DatabaseConstant.ORACLE_JDBC_SERVICE_NAME_URL, server, port,
                                            connectValue);
                            break;
                        default:
                            url = String.format(DatabaseConstant.ORACLE_JDBC_SID_URL, server, port,
                                    connectValue);
                    }
                    sql = DatabaseConstant.ORACLE_SHOW_DATABASES;
                    break;
                default:
                    throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_UNSUPPORTED);
            }

            Connection con = JDBCUtil.getConnection(url, conTest.getUser(), conTest.getPassword());
            List<String> dbs = JDBCUtil.getAllDBs(con, sql);
            jo.put("code", 200);
            jo.put("data", dbs);
        } catch (SQLException e) {
            jo.put("code", 500);
            jo.put("errMsg", "无法连接服务器，请检查配置");
            logger.error(e.getMessage(), e);
        }

        return jo;
    }

    public JSONObject getTables(DBWithBDNameVO dBWithBDNameVO) {
        JSONObject jo = new JSONObject();
        Connection con = null;

        String server = dBWithBDNameVO.getServer();
        Integer port = dBWithBDNameVO.getPort();
        String database = dBWithBDNameVO.getDatabaseName();
        String url = "";
        String sql = "";
        String dbType = dBWithBDNameVO.getDatabaseType();
        dbType = dbType == null ? "" : dbType.trim().toLowerCase();

        switch (dbType) {
            case "mysql":
            case "rds-mysql":
                url = String.format(DatabaseConstant.MYSQL_JDBC_URL, server, port, database);
                sql = DatabaseConstant.MYSQL_SHOW_TABLES;
                break;
            case "oracle":
                sql = DatabaseConstant.ORACLE_SHOW_TABLES;
                String connectType = dBWithBDNameVO.getConnectType();
                String connectValue = dBWithBDNameVO.getConnectValue();
                switch (connectType == null ? "" : connectType.toLowerCase()) {
                    case "servicename":
                        url = String
                                .format(DatabaseConstant.ORACLE_JDBC_SERVICE_NAME_URL, server, port,
                                        connectValue);
                        break;
                    default:
                        url = String.format(DatabaseConstant.ORACLE_JDBC_SID_URL, server, port,
                                connectValue);
                }
                break;
            default:
                throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_UNSUPPORTED);
        }

        try {
            con = JDBCUtil
                    .getConnection(url, dBWithBDNameVO.getUser(), dBWithBDNameVO.getPassword());
            List<String> dbs = JDBCUtil.getAllDBs(con, sql);
            jo.put("data", dbs);
            jo.put("code", 200);
        } catch (SQLException e) {
            jo.put("code", 500);
            jo.put("errMsg", e.getCause().getMessage());
            logger.error(e.getMessage());
        } finally {
            JDBCUtil.close(con, null, null);
        }
        return jo;
    }

    public PreviewDatasetVO queryTableData(DBPreviewVO dBPreviewVO) {
        String dbType = dBPreviewVO.getDatabaseType();
        dbType = dbType == null ? "" : dbType.trim().toLowerCase();

        switch (dbType) {
            case "mysql":
            case "oracle":
            case "rds-mysql":
                break;
            default:
                throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_UNSUPPORTED);
        }

        String url = JDBCUtil
                .getUrl(dBPreviewVO.getServer(), dBPreviewVO.getPort(), dBPreviewVO.getDatabaseName(),
                        dbType, dBPreviewVO.getConnectType(), dBPreviewVO.getConnectValue());
        DataSource ds = JDBCUtil
                .getDataSource(url, dBPreviewVO.getUser(), dBPreviewVO.getPassword());
        Table tableMeta = MetaUtil.getTableMeta(ds, dBPreviewVO.getTableName());
        List<Entity> data;

        try {
            data = JDBCUtil.page(ds, tableMeta, 0, 10, dbType, true);
        } catch (SQLException e) {
            logger.error(e.getMessage());
            throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_SQL_ERROR, e);
        }
        PreviewDatasetVO vo = PreviewDatasetVO
                .of(data, tableMeta.getTableName(), tableMeta, dbType);
        if (dBPreviewVO.getBaseTableConfigVO() != null) {
            Map<String, HeadVO> map = new HashMap<>();
            for (HeadVO headvo : vo.getHead()) {
                map.put(headvo.getName(), headvo);
            }
            List<HeadVO> newHead = new ArrayList<>();
            List<Entity> newData = vo.getData();
            for (DatasetColumnDTO datasetColumnDTO : dBPreviewVO.getBaseTableConfigVO().getData()) {
                if (datasetColumnDTO.getImportColumn()) {
                    newHead.add(map.get(datasetColumnDTO.getName()));
                }
                if (datasetColumnDTO.getDataMaskingType() != null) {
                    for (int i = 0; i < newData.size(); i++) {
                        Entity entity = newData.get(i);
                        String currentName = datasetColumnDTO.getName();
                        entity.set(currentName, getNewValue(datasetColumnDTO.getDataMaskingType(),
                                entity.get(currentName).toString()));
                        newData.set(i, entity);
                    }
                }
            }
            vo.setHead(newHead);
            vo.setData(newData);
        }

        return vo;
    }

    public String getNewValue(String dataMaskingType, String oldValue) {
        String newValue = "";
        switch (dataMaskingType) {
            case "md5":
                try {
                    newValue = CryptoUtil.hmacMD5(oldValue);
                } catch (Exception e) {
//          log.error("md5 desensitization error");
                }
                break;
            case "sha1":
                try {
                    newValue = CryptoUtil.hmacSHA1(oldValue);
                } catch (Exception e) {
//          log.error("sha1 desensitization error");
                }
                break;
            case "mosaic":
                newValue = CryptoUtil.stringMosaic(oldValue);
                break;
            default:
                break;
        }
        return newValue;
    }

    public List<String> getScheduledHead(DBPreviewVO dBPreviewVO) throws SQLException {
        List<String> typeList = new ArrayList<>(Arrays
                .asList("TINYINT", "SMALLINT", "MEDIUMINT", "INT", "INTEGER", "BIGINT", "FLOAT",
                        "DOUBLE", "DECIMAL",
                        "DATE", "TIME", "YEAR", "DATETIME", "TIMESTAMP"));
        List<String> result = new ArrayList<>();
        String dbType = dBPreviewVO.getDatabaseType();
        dbType = dbType == null ? "" : dbType.trim().toLowerCase();

        switch (dbType) {
            case "mysql":
            case "oracle":
            case "rds-mysql":
                break;
            default:
                throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_UNSUPPORTED);
        }
        String url = JDBCUtil
                .getUrl(dBPreviewVO.getServer(), dBPreviewVO.getPort(), dBPreviewVO.getDatabaseName(),
                        dbType, dBPreviewVO.getConnectType(), dBPreviewVO.getConnectValue());
        Connection con = JDBCUtil
                .getConnection(url, dBPreviewVO.getUser(), dBPreviewVO.getPassword());
        TableSchema tableSchema = JDBCUtil.getTableSchema(con, dBPreviewVO.getTableName());
        if (tableSchema == null) {
            return new ArrayList<>();
        }
        for (ColumnSchema columnSchema : tableSchema.getColumns()) {
            String type = columnSchema.getColumnType();
            if (typeList.contains(type.toUpperCase())) {
                result.add(columnSchema.getColumnName());
            }
        }
        result.add("_rowid");
        return result;
    }

    public long insert(DatasetDTO dto) {
        String datasetName = dto.getName();
        int i = 1;
        for (; i <= DATASET_NAME_MAX_AUTO_INCREMENT; i++) {
            try {
                datasetMapper.save(dto);
                break;
            } catch (DuplicateKeyException e) {
                dto.setName(autoIncrementDatasetName(datasetName, i));
            }
        }
        if (i > DATASET_NAME_MAX_AUTO_INCREMENT) {
            throw DataScienceException
                    .of(BaseErrorCode.DATASET_NAME_DUPLICATE_ERROR, "retry times reach limit",
                            datasetName);
        }
        return dto.getId();
    }

    public Long saveDataset(Long uid, Long categoryId, String datasetName, DatasetJsonInfo dj) {
        DatasetDTO datasetDTO = buildDataset(uid, categoryId, datasetName, dj);
        return insert(datasetDTO);
    }

    public Long saveDataset(Long uid, Long categoryId, String datasetName, DatasetJsonInfo dj,
                            String url, String status) {
        DatasetDTO datasetDTO = buildDataset(uid, categoryId, datasetName, dj, url, status);
        return insert(datasetDTO);
    }

    public String autoIncrementDatasetName(String datasetName, int i) {
        return datasetName + CharUtil.UNDERLINE + i;
    }

    public void checkAuth(long datasetId, boolean checkSystemAdmin) {
        long userId = JwtUtil.getCurrentUserId();
        UserDTO user = JwtUtil.getCurrentUserDTO();
        //系统管理员可以访问所有数据集
        if (user.getRole() == 1 && checkSystemAdmin) {
            return;
        }
        //单纯数据集接口仅拥有者可以读写
        if (datasetMapper.checkAuth(datasetId, userId) == 0) {
            throw DataScienceException.of(BaseErrorCode.UNAUTHORIZED, "数据集id:" + datasetId);
        }
    }

    public String buildDataJson(String importType, String targetTableName) {
        DataInfo di = DataInfo.builder()
                .schema(DatabaseConstant.GREEN_PLUM_DEFAULT_SCHEMA)
                .table(targetTableName)
                .type(importType)
                .build();
        return JSON.toJSONString(di);
    }

    public DatasetDTO buildDataset(String importType, String datasetName, long categoryId, long uid,
                                   String targetTableName) {
        return DatasetDTO.builder()
                .dataJson(buildDataJson(importType, targetTableName))
                .categoryId(categoryId)
                .userId(uid)
                .name(datasetName)
                .gmtCreator(uid)
                .gmtModifier(uid)
                .build();
    }

    public DatasetDTO buildDataset(Long uid, Long categoryId, String datasetName,
                                   DatasetJsonInfo dj) {
        return DatasetDTO.builder()
                .dataJson(JSON.toJSONString(dj))
                .categoryId(categoryId)
                .userId(uid)
                .name(datasetName)
                .gmtCreator(uid)
                .gmtModifier(uid)
                .build();
    }

    public DatasetDTO buildDataset(Long uid, Long categoryId, String datasetName,
                                   DatasetJsonInfo dj, String url, String status) {
        return DatasetDTO.builder()
                .dataJson(JSON.toJSONString(dj))
                .categoryId(categoryId)
                .userId(uid)
                .name(datasetName)
                .dataConfig(url)
                .incrementalDataConfig(status)
                .gmtCreator(uid)
                .gmtModifier(uid)
                .build();
    }

    public DatasetDTO buildDataset(Long uid, Long categoryId, String datasetName,
                                   DatasetJsonInfo dj, DataConfigDTO dataConfigDTO,
                                   DatasetConfigInfo datasetConfigInfo) {
        String dataConfig = "";
        try {
            //加密数据库配置信息
            dataConfig = DataConfigEncryption
                    .encrypt(String.valueOf(uid), JSON.toJSONString(dataConfigDTO));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return DatasetDTO.builder()
                .dataJson(JSON.toJSONString(dj))
                .dataConfig(dataConfig)
                .incrementalDataConfig(JSON.toJSONString(datasetConfigInfo))
                .categoryId(categoryId)
                .userId(uid)
                .name(datasetName)
                .gmtCreator(uid)
                .gmtModifier(uid)
                .build();
    }

    /**
     * 根据sql查询用户数据库
     *
     * @param sql
     * @return
     */
    public JSONObject queryDataBySql(String sql, Connection con) throws SQLException {
        ResultSet rs = null;
        PreparedStatement ps = null;
        JSONObject jo = new JSONObject();
        JSONObject result = new JSONObject();
        result.put("code", 200);

        try {
            //获取gp数据库中数据集库的连接（指定id为1）
//      con = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID);
            ps = con.prepareStatement(sql);
            rs = ps.executeQuery();
            JSONArray ja = new JSONArray();
            JSONArray heads = new JSONArray();
            /* 获取limit条数据 */
            String limitSql = String
                    .format(DatabaseConstant.GP_LIMIT_SQL, DatabaseConstant.GP_PREVIEW_COUNT);
            ResultSetMetaData meta = rs.getMetaData();
            int colCount = meta.getColumnCount();
            /* 生成head结构 */
            List<String> colNames = new ArrayList<>();
            for (int i = 1; i < colCount + 1; i++) {
                if (DEFAULT_ID_FIELD.equals(meta.getColumnName(i))) {
                    continue;
                }
                JSONObject head = new JSONObject();
                String name = meta.getColumnName(i);
                head.put("name", name);
                head.put("type", SqlUtil.changeType(meta.getColumnTypeName(i)));
                heads.add(head);
                colNames.add(name);
            }
            jo.put("head", heads);
            /* 生成data结构 */
            while (rs.next()) {
                JSONObject column = new JSONObject();
                for (String colName : colNames) {
                    if (DEFAULT_ID_FIELD.equals(colName)) {
                        continue;
                    }
                    column.put(colName, rs.getString(colName));
                }
                ja.add(column);
            }
            jo.put("data", ja);
        } finally {
            JDBCUtil.close(con, ps, rs);
        }
        result.put("data", jo);
        return result;
    }

    public String getMaxValue(DataSource dataSource, String tableName, String incrementColumn)
            throws SQLException {
        if (incrementColumn == null || incrementColumn.length() == 0) {
            incrementColumn = "_rowid";
        }
        String sql = "select max(" + incrementColumn + ") from `" + tableName + "`";
        Connection conn = dataSource.getConnection();
        JSONObject jsonObject = queryDataBySql(sql, conn);
        String s = JSONObject.parseObject(jsonObject.getString("data")).getString("data");
        return JSONObject.parseObject(JSONObject.parseArray(s).get(0).toString())
                .get("max(" + incrementColumn + ")").toString();
    }

    public String getMaxValue(String tableName, String incrementColumn) throws Exception {
        if (incrementColumn == null || incrementColumn.length() == 0) {
            incrementColumn = "_rowid";
        }
        String sql = "select max(" + incrementColumn + ") from dataset." + tableName;
        Connection conn = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID);
        JSONObject jsonObject = queryDataBySql(sql, conn);
        String s = JSONObject.parseObject(jsonObject.getString("data")).getString("data");
        return JSONObject.parseObject(JSONObject.parseArray(s).get(0).toString()).get("max")
                .toString();
    }

    public boolean checkSchedule(DataSource dataSource, String tableName) {
        String sql = "select _rowid from `" + tableName + "`";
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = dataSource.getConnection();
            ps = conn.prepareStatement(sql);
            ps.executeQuery();
        } catch (SQLException throwables) {
            return false;
        } finally {
            JDBCUtil.close(conn, ps, null);
        }
        return true;
    }

    public String getDatabaseType(String dbType) {
        dbType = dbType == null ? "" : dbType.trim().toLowerCase();

        switch (dbType) {
            case "mysql":
            case "oracle":
                break;
            case "rds-mysql":
                //后续rds-mysql的处理和mysql一样，所以此处转为mysql
                dbType = "mysql";
                break;
            default:
                throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_UNSUPPORTED);
        }
        return dbType;
    }

    public String getCrontab(DatasetScheduleDTO datasetScheduleDTO) {
        StringBuilder stringBuilder = new StringBuilder();
        String type = datasetScheduleDTO.getScheduleType();
        switch (type) {
            case "day":
                stringBuilder.append("0 ")
                        .append(datasetScheduleDTO.getMinuteOfHour())
                        .append(" ").append(datasetScheduleDTO.getHourOfDay())
                        .append(" * * ?");
                break;
            case "week":
                stringBuilder.append("0 ")
                        .append(datasetScheduleDTO.getMinuteOfHour())
                        .append(" ").append(datasetScheduleDTO.getHourOfDay())
                        .append(" ? * ").append(datasetScheduleDTO.getDaysOfWeek());
                break;
            case "hour":
                stringBuilder.append("0 ")
                        .append(datasetScheduleDTO.getMinuteOfHour())
                        .append(" 0/").append(datasetScheduleDTO.getIntervalOfHour())
                        .append(" * * ?");
                break;
        }
        return stringBuilder.toString();
    }

    public Boolean updateConfig(DatasetDTO datasetDTO) {
        if (datasetDTO.getDataConfig() != null) {
            String dataConfig = datasetDTO.getDataConfig();
            String userId = String.valueOf(datasetDTO.getUserId());
            String encryptedDataConfig = "";
            try {
                encryptedDataConfig = DataConfigEncryption.encrypt(userId, dataConfig);
            } catch (Exception e) {
                e.printStackTrace();
            }
            datasetDTO.setDataConfig(encryptedDataConfig);
        }
        return update(datasetDTO);
    }

    public Boolean updateConfig(DatasetUpdateConfigDTO datasetUpdateConfigDTO) {
        DataConfigDTO dataConfigDTO = null;
        DatasetConfigInfo datasetConfigInfo = null;
        DataSource dataSource;
        if (datasetUpdateConfigDTO.getServer() != null) {
            String dbType = datasetUpdateConfigDTO.getDatabaseType();
            dbType = dbType == null ? "" : dbType.trim().toLowerCase();

            switch (dbType) {
                case "mysql":
                case "oracle":
                case "rds-mysql":
                    break;
                default:
                    throw new DataScienceException(BaseErrorCode.DATASET_PREVIEW_UNSUPPORTED);
            }
            String url = JDBCUtil
                    .getUrl(datasetUpdateConfigDTO.getServer(), datasetUpdateConfigDTO.getPort(),
                            datasetUpdateConfigDTO.getDatabaseName(),
                            dbType, datasetUpdateConfigDTO.getConnectType(),
                            datasetUpdateConfigDTO.getConnectValue());
            dataConfigDTO = DataConfigDTO.builder()
                    .databaseType(datasetUpdateConfigDTO.getDatabaseType())
                    .server(datasetUpdateConfigDTO.getServer())
                    .port(datasetUpdateConfigDTO.getPort())
                    .user(datasetUpdateConfigDTO.getUser())
                    .password(datasetUpdateConfigDTO.getPassword())
                    .databaseName(datasetUpdateConfigDTO.getDatabaseName())
                    .tableName(datasetUpdateConfigDTO.getTableName())
                    .url(url)
                    .build();
        }
        DatasetDTO datasetDTO1 = queryById(datasetUpdateConfigDTO.getId());
        String decrypt = "";
        try {
            decrypt = DataConfigEncryption.decrypt(datasetUpdateConfigDTO.getUserId().toString(),
                    datasetDTO1.getDataConfig());
        } catch (Exception e) {
            logger.error("decrypt error!");
        }
        JSONObject dataConfigJson = JSON.parseObject(decrypt);
        dataSource = JDBCUtil.getDataSource(JDBCUtil.getUrl(dataConfigJson.getString("server"),
                Integer.parseInt(dataConfigJson.getString("port")),
                dataConfigJson.getString("databaseName"),
                "mysql", datasetUpdateConfigDTO.getConnectType(),
                datasetUpdateConfigDTO.getConnectValue()),
                dataConfigJson.getString("user"), dataConfigJson.getString("password"));
        if (datasetUpdateConfigDTO.getDatasetScheduleConfig() != null) {
            DatasetDTO savedDataset = datasetMapper.queryById(datasetUpdateConfigDTO.getId());
            if (savedDataset == null) {
                return false;
            }
            String lastValue;
            JSONObject jsonObject = JSONObject.parseObject(savedDataset.getIncrementalDataConfig());
            lastValue = jsonObject.getString("lastValue");
            String incrementColumnSaved = jsonObject.getString("incrementColumn");
            if (incrementColumnSaved == null) {
                incrementColumnSaved = "";
            }
            DatasetScheduleDTO datasetScheduleDTO = datasetUpdateConfigDTO
                    .getDatasetScheduleConfig();
            if (datasetScheduleDTO == null || !datasetScheduleDTO.getNeedSchedule()) {
                datasetConfigInfo = DatasetConfigInfo.builder()
                        .needSchedule(false)
                        .build();
            } else {
                if (lastValue == null || !incrementColumnSaved.equals(datasetScheduleDTO.getIncrementColumn())) {
                    try {
                        lastValue = getMaxValue(dataSource, datasetUpdateConfigDTO.getTableName(),
                                datasetScheduleDTO.getIncrementColumn());
                    } catch (SQLException e) {
                        throw new DataScienceException(BaseErrorCode.DATASET_IMPORT_ERROR);
                    }
                }
                datasetConfigInfo = DatasetConfigInfo.builder()
                        .needSchedule(true)
                        .crontab(getCrontab(datasetScheduleDTO))
                        .incrementColumn(datasetScheduleDTO.getIncrementColumn())
                        .lastValue(lastValue)
                        .detail(datasetScheduleDTO)
                        .build();
            }
        }
        DatasetDTO datasetDTO = new DatasetDTO();
        datasetDTO.setId(datasetUpdateConfigDTO.getId());
        datasetDTO.setUserId(datasetUpdateConfigDTO.getUserId());
        if (dataConfigDTO != null) {
            datasetDTO.setDataConfig(JSON.toJSONString(dataConfigDTO));
        }
        if (datasetConfigInfo != null) {
            datasetDTO.setIncrementalDataConfig(JSON.toJSONString(datasetConfigInfo));
        }

        return updateConfig(datasetDTO);
    }

    public JSONObject checkDatasetName(Long categoryId, String datasetName) {
        //int count = datasetMapper.countByCategoryIdAndDatasetName(categoryId,datasetName);
        String tips = "";
        JSONObject res = new JSONObject();
        DatasetDTO dataset = datasetMapper.queryByCategoryIdAndDatasetName(categoryId, datasetName);
        boolean exists = dataset == null ? false : true;

        if (exists) {
            DatasetCategoryDTO dc = datasetCategoryMapper.queryById(categoryId);

            List<DatasetProjectDTO> datasetProjectList = datasetProjectMapper
                    .queryByDatasetId(dataset.getId());
            if (CollectionUtils.isNotEmpty(datasetProjectList)) {
                tips = dc.getName() + " 中已存在同名数据，并且该数据集已被加载到项目中。若替换不仅会覆盖其当前内容并且会对加载该数据集的项目造成一定影响。";
            } else {
                tips = dc.getName() + " 中已存在同名数据，若替换则会覆盖其当前内容。";
            }
        }
        res.put("exists", exists);
        res.put("tips", tips);
        return res;
    }

    public DatasetDTO queryByCategoryIdAndDatasetName(Long categoryId, String datasetName) {
        return datasetMapper.queryByCategoryIdAndDatasetName(categoryId, datasetName);
    }

    /**
     * 删除数据集或转移归属人
     *
     * @param datasetDTO
     * @param transferUserId
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean deleteOrTransferUser(DatasetDTO datasetDTO, Long transferUserId) {
        String currentUserName = JwtUtil.getCurrentUserDTO().getName();
        datasetDTO = datasetMapper.queryById(datasetDTO.getId());
        if (transferUserId != null && transferUserId > 0) {
            //数据集所有权转移
            DatasetCategoryDTO dto = datasetCategoryMapper
                    .queryByUserIdAndName(transferUserId, "默认分类");
            if (dto == null) {
                dto = new DatasetCategoryDTO();
                dto.setName("默认分类");
                dto.setUserId(transferUserId);
                dto.setGmtCreator(transferUserId);
                dto.setGmtModifier(transferUserId);
                datasetCategoryMapper.save(dto);
            }
            String oldName = datasetDTO.getName();
            datasetDTO.setUserId(transferUserId);
            datasetDTO.setCategoryId(dto.getId());
            datasetDTO.setName(datasetDTO.getName() + "_" + currentUserName + "_" + datasetDTO.getId());
            datasetMapper.update(datasetDTO);
            List<DatasetProjectDTO> datasetProjects = datasetProjectMapper
                    .queryByDatasetId(datasetDTO.getId());
            List<UserProjectDTO> userProjects = userProjectMapper.listByUserId(transferUserId);
            HashSet<Long> projectIds = new HashSet<>();
            if (userProjects != null) {
                for (UserProjectDTO userProject : userProjects) {
                    //获取权限超访客的用户id
                    if (userProject.getRoleId() < ProjectRoleAuthEnum.VISITOR.getValue()) {
                        projectIds.add(userProject.getProjectId());
                    }
                }
            }

            if (datasetProjects != null) {
                for (DatasetProjectDTO datasetProject : datasetProjects) {
                    if (projectIds.contains(datasetProject.getProjectId())) {
                        DatasetProjectDTO newDatasetProject = new DatasetProjectDTO();
                        newDatasetProject.setId(datasetProject.getId());
                        newDatasetProject.setGmtCreator(transferUserId);
                        newDatasetProject.setGmtModifier(transferUserId);
                        datasetProjectMapper.updateByPrimaryKeySelective(newDatasetProject);
                    } else {
                        datasetProjectMapper.deleteByPrimaryKey(datasetProject.getId());
                    }
                }
            }
            //发送消息
            sendTransferNotice(transferUserId, oldName, datasetDTO.getName());
            return true;
        } else {
            //删除
            int res = datasetMapper.delete(datasetDTO);
            if (res > 0) {
                /* 删除GP中数据集源表 */
                Connection con = null;
                PreparedStatement ps = null;
                try {
                    String json = datasetDTO.getDataJson();
                    JSONObject jo = JSONObject.parseObject(json);
                    String schema = jo.getString("schema");
                    String table = jo.getString("table");
                    String newTable = schema + "." + SqlUtil.formatPGSqlColName(table);
                    con = gpDataProvider.getConn(DatabaseConstant.GREEN_PLUM_DATASET_ID);
                    ps = con
                            .prepareStatement(String.format(DatabaseConstant.SQL_DROP_TABLE, newTable));
                    ps.execute();
                } catch (Exception e) {
                    logger.error(e.getMessage());
                    throw new DataScienceException(BaseErrorCode.DATASET_GP_DELETE_ERROR);
                } finally {
                    JDBCUtil.close(con, ps, null);
                }
                return true;
            }
        }

        return false;
    }

    /**
     * 发送给移除给数据集的的用户
     *
     * @param transferUserId
     */
    public void sendTransferNotice(Long transferUserId, String oldName, String newName) {
        UserDTO currentUser = JwtUtil.getCurrentUserDTO();
        String title = NoticeConstant.DATASET_TRANSFER_TITLE;
        String content = String
                .format(NoticeConstant.DATASET_TRANSFER_CONTENT,
                        StringUtil.addHtmlBoldLabel(currentUser.getName()),
                        StringUtil.addHtmlBoldLabel(oldName),
                        StringUtil.addHtmlBoldLabel("默认分类"),
                        StringUtil.addHtmlBoldLabel(newName));

        noticeService.saveAndSendToUser(transferUserId, title, content, null,
                NoticeTypeEnum.system.getType(), currentUser.getId());
    }

//  public LocalDateTime queryLastGmtModify(String datasetName) {
//    DatasetDTO dataset = datasetMapper.queryLastGmtModify(datasetName);
//    return dataset.getGmtModify();
//  }

}

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
class DataInfo {

    private String schema;
    private String table;
    private String type;

}
